您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15231|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;( x, U5 T6 }+ K- L1 [( s# e
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    2 _. q. k0 r( O6 Z2 ]' L8 W! w4 D
  3. 3 I7 s4 g# y. ^: L( ^
  4. $worker = new Worker();- z# L" k9 O  \8 C- D$ Z% u" {
  5. // 4个进程
    8 M. ?2 g: g0 B( p6 D
  6. $worker->count = 4;( J8 n" l" S7 h3 m7 U
  7. // 每个进程启动后在当前进程新增一个Worker监听
    8 W2 x: F; E- V4 w* l
  8. $worker->onWorkerStart = function($worker)
    * O3 t+ F8 B" B. D4 S
  9. {
    1 y4 \  r; A! {& M, N8 w
  10.     /**
    1 f7 o0 ?; t$ O8 }  _
  11.      * 4个进程启动的时候都创建2016端口的Worker
    ) k1 N: f8 t6 ^! o) ?, G& f: M5 `/ T8 c
  12.      * 当执行到worker->listen()时会报Address already in use错误' Q" y2 ]1 D7 ^/ O0 x+ M6 E4 }& N
  13.      * 如果worker->count=1则不会报错5 p- X2 m1 A& J$ L3 C3 C9 m6 Q
  14.      */
    . z& n8 m# f5 R' h
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');5 T- t4 ?% U2 f, s- w3 \
  16.     $inner_worker->onMessage = 'on_message';
    5 d/ T1 l7 T: x
  17.     // 执行监听。这里会报Address already in use错误7 c  S! M1 Z* f
  18.     $inner_worker->listen();
      d2 l( ^6 @( E* L& ]$ Q4 R; u
  19. };
    4 y7 g* Y0 A  j3 l6 I& c- R

  20. 9 j. _' j- H, z4 c
  21. $worker->onMessage = 'on_message';
    ( F. e/ N) g" j  d  @: d- h1 ^( w
  22. 0 p, q" ^1 ~2 L1 g& X. H- F
  23. function on_message($connection, $data)
    6 a; ]5 s2 U: j+ r
  24. {
    ( C' D4 H& M7 X+ L. {/ R
  25.     $connection->send("hello\n");# S3 @+ g0 j; z
  26. }
    / S# Y+ }' W: L

  27. # ~6 g5 _$ I/ W# {4 C
  28. // 运行worker" z! |/ ^6 C) K3 H; c$ q
  29. Worker::runAll();- L; |6 W! l* V% X2 |) r, u
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    : _1 Y- `* u2 M# u3 j

  31. ; c4 Y! ~' T3 ?$ F, A; s
  32. use Workerman\Worker;
    . H7 R" w$ {5 ^! l
  33. require_once './Workerman/Autoloader.php';
    3 _; g4 G. S: g: g7 l  D; a

  34. : j  {+ k6 w( d
  35. $worker = new Worker('text://0.0.0.0:2015');  D( s; ]$ g; J  y6 h3 z2 Z4 w
  36. // 4个进程
    ! F/ r% `" f  L5 ^& W
  37. $worker->count = 4;
    % Q& l* ?" {( G% G2 J/ c' k! J
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ! }7 [- j" L2 Q, A' R
  39. $worker->onWorkerStart = function($worker)
    - G7 b5 q; b2 Q# A+ ~3 O6 B
  40. {: R/ \% F3 v2 h) K; U" H' a! ?
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ! ^( d. [+ X  C5 G5 d* G+ P
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)4 |3 k+ f; F( v1 X% N! a, [+ e9 D' B
  43.     $inner_worker->reusePort = true;
    , R: E( J2 v5 I+ B6 Y8 B$ g
  44.     $inner_worker->onMessage = 'on_message';4 B5 c" }& m+ S$ [- I
  45.     // 执行监听。正常监听不会报错
    ' o4 d3 M* A  |. Q% A
  46.     $inner_worker->listen();  h" e# ]1 {# a5 l' ^1 c* r
  47. };
    9 l1 q/ }0 b: _7 t
  48. $ T  S& J4 b9 Q) }- T7 P7 R. M
  49. $worker->onMessage = 'on_message';
    & Z  q! e( c- r/ E0 i5 A

  50. : Y6 n/ F& `' a: O
  51. function on_message($connection, $data)
    : z- N8 S9 F* q' J" e
  52. {
    2 S1 q& d& h) K5 _
  53.     $connection->send("hello\n");
    # f8 p3 Z) u' x% v$ G  u
  54. }: h9 q! `! \! d/ E$ ?
  55.   t  H4 w7 J4 M# q# t. n
  56. // 运行worker- B1 P! X/ n( h) \! n4 Q
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php2 V, n4 ?  F, C
  2. use Workerman\Worker;
    . p& X6 T/ p; [! U9 u! e
  3. require_once './Workerman/Autoloader.php';
    ' r& f/ i2 Y0 q# I% d  s" V% n
  4. // 初始化一个worker容器,监听1234端口
    - ]3 b0 w6 {5 V7 `7 `
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    + Z; x; N* A- o. n4 k
  6. ' c" G# C& W( I
  7. /*
    & t) V6 z8 U7 w7 o  H' \
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    . \# I/ C- ~5 w7 W2 O
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    " O- }0 o4 H5 D
  10. */# X, P% ~+ M% f
  11. $worker->count = 1;
    4 U6 @% V; K: p2 V- W
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    0 A0 b$ Y$ @! {( S, X0 ^2 K9 h" Q
  13. $worker->onWorkerStart = function($worker)! U; B+ Z; L4 o
  14. {, f5 z% E: f# H8 D* C
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符$ X8 d* `; c# S* |+ U  f
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');1 f; `  r. C' t/ q* _* N
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    ; Y* w6 U# ?8 Z$ z2 v
  18.     {. c" c! c. v: S0 u
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据2 m/ J. Y. Q- J$ Q! N% |
  20.         $data = json_decode($buffer, true);
    % m; e7 `4 x( t* r# {# v8 E
  21.         $uid = $data['uid'];' F* X2 z; W1 _- D  n! _
  22.         // 通过workerman,向uid的页面推送数据$ b6 S' t0 {3 b. Q0 Z( F# R
  23.         $ret = sendMessageByUid($uid, $buffer);
    % O& s8 m6 n7 I
  24.         // 返回推送结果& ]2 N* }' g/ H6 v# F, Q
  25.         $connection->send($ret ? 'ok' : 'fail');/ Y* s: w" Y- p" [6 u
  26.     };
    # h0 }8 P# p0 H; D. C( j) [
  27.     // ## 执行监听 ##* S' w) L+ }9 m8 e7 w8 g
  28.     $inner_text_worker->listen();
    ' R2 E4 P; @0 y9 W4 k) z
  29. };
    $ E, v: z3 T- Z: W
  30. // 新增加一个属性,用来保存uid到connection的映射. K: {0 d" m) a
  31. $worker->uidConnections = array();' _% u9 E$ G& P! g" d
  32. // 当有客户端发来消息时执行的回调函数
    " \$ |0 a/ a0 D$ f( ~  Q
  33. $worker->onMessage = function($connection, $data)) D/ V# [* @4 }& G7 D
  34. {
    9 O& ]% B, S: v% J# j) a% h
  35.     global $worker;
    2 w  o5 y! W, S1 f2 L: T
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    * K+ C0 x' h! _6 ]% k* z
  37.     if(!isset($connection->uid))
    . R! x' E% s# |7 G' N
  38.     {
    $ L2 X- u* @2 Z8 W# P
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    0 i5 V/ s+ v. E3 X
  40.        $connection->uid = $data;" ~% B9 y& Q5 G- s( Q8 N
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,( c; K6 h' a7 Q) ^
  42.         * 实现针对特定uid推送数据4 I6 R- C, E3 J4 }4 J
  43.         */- w' Z% O. E# X# y, e6 q
  44.        $worker->uidConnections[$connection->uid] = $connection;( t( [: t0 ?6 Q; W% V
  45.        return;
    . U' W$ |+ ]# G2 [( V* \% o
  46.     }2 v( E4 N: a8 I
  47. };
    1 ]4 g" k' k* M; f& D* u: j

  48. # F9 O# u+ _# R0 J
  49. // 当有客户端连接断开时7 }9 t3 M6 d4 j$ r" [; r& X
  50. $worker->onClose = function($connection)
    & l2 H+ ^% J% I- o4 y
  51. {1 J. \! K7 a2 O) i9 D. d: M
  52.     global $worker;
    ; g5 V# n8 c' y7 W
  53.     if(isset($connection->uid))# E0 ~, {+ W6 {8 q! c  E6 {# I
  54.     {, y% F. a: [" L
  55.         // 连接断开时删除映射
    % b3 F- h% k; J/ \6 k
  56.         unset($worker->uidConnections[$connection->uid]);' B% g$ }3 ?/ ^5 I: L
  57.     }9 i- E" o/ M; B& W% g2 r
  58. };
    ! Q  m/ R. e3 E" g- n
  59. % f4 M5 \- D6 Q$ _  y7 f
  60. // 向所有验证的用户推送数据
    0 ?( F8 F* I3 w) e
  61. function broadcast($message)5 }; p( p: n+ z/ q% `" X; X
  62. {
    9 r6 J0 N" l' W- n+ T$ `. `
  63.    global $worker;
      u$ t5 O4 N8 e' O
  64.    foreach($worker->uidConnections as $connection)" l9 v1 a6 t- G) m% c, \/ g
  65.    {
    7 q7 s. c: J  p3 U
  66.         $connection->send($message);
    - v# ^* B* I; z5 X% n9 G6 _
  67.    }3 r4 ?$ ?6 k% v' ?/ p5 C0 ^
  68. }
    7 L( g5 F" X% R4 K0 `# S& l- `
  69. 9 q+ U  t' Y8 b0 }4 d" f  P
  70. // 针对uid推送数据
    - ~2 Q& u! |* ^" e* r5 E, U
  71. function sendMessageByUid($uid, $message)/ D- c' q8 A3 l5 _5 h
  72. {! W# _) i; J9 m1 q1 ^5 \5 ^
  73.     global $worker;
    : ?0 z9 p  M- ~3 w% T9 z+ y6 N
  74.     if(isset($worker->uidConnections[$uid]))
    . _* t6 e3 W- k) M0 d8 Q
  75.     {
      D, C& @# @; f% t. }
  76.         $connection = $worker->uidConnections[$uid];8 C) q% v7 r; e2 o4 G9 {$ Y( G$ a
  77.         $connection->send($message);
    ! n$ y' r, [! c1 N; I
  78.         return true;
    6 A. w4 S  {+ n! \! d
  79.     }) s, |/ c& H" L9 m
  80.     return false;
    2 b  F. ?9 R( j0 P& g+ {
  81. }! a& e" q( z& f0 @
  82. ' [$ z" k6 G  B- ~0 u; P
  83. // 运行所有的worker
      u5 Q4 F: n/ ^! h
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');$ d! r0 D1 v& ^. c5 U" d
  2. ws.onopen = function(){
    5 S. G, {3 M; P0 ^
  3.     var uid = 'uid1';6 K3 q8 |0 z2 \3 W2 S% u9 s0 x
  4.     ws.send(uid);- ?4 Y$ j4 M6 O/ i1 M* T
  5. };
    . E( @0 O" G- v: {3 T7 W
  6. ws.onmessage = function(e){
    5 R( A7 [) y2 `; H. Z
  7.     alert(e.data);
    2 A; b* F$ F! [" i
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口. z2 T2 w" _. j3 H* i1 e$ c* f
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);7 Q0 t# z; _( a% ~  J# \
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    7 c6 w0 E9 j; ~
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    6 s0 k3 w2 P4 M8 C  H) W$ J7 r# u( {
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符. ]8 o6 O, A6 u, H7 W( z5 @
  6. fwrite($client, json_encode($data)."\n");
    + A$ F7 t1 [8 ]2 j7 z) S
  7. // 读取推送结果7 u6 u/ _! @* a/ P; M+ h& c# X
  8. echo fread($client, 8192);
复制代码
8 o0 y5 P( M1 Y0 M# ?3 e

) S; u6 R0 Q* z# @
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-6-20 05:39 , Processed in 0.075903 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!